package com.mai.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mai.realtime.app.BaseAppV1;
import com.mai.realtime.bean.UserRegisterBean;
import com.mai.realtime.common.Constant;
import com.mai.realtime.util.AtguiguUtil;
import com.mai.realtime.util.FlinkSinkUtil;
import com.mai.realtime.util.FlinkSourceUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

public class Dws_02_02_DwsUserUserRegisterWindow extends BaseAppV1 {
    public static void main(String[] args) throws Exception {
        new Dws_02_02_DwsUserUserRegisterWindow().init(
                20002,
                2,
                "Dws_02_02_DwsUserUserRegisterWindow",
                Constant.TOPIC_DWD_USER_REGISTER
        );
    }

    @Override
    protected void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {

        // TODO 4. 转换数据结构
        SingleOutputStreamOperator<JSONObject> mappedStream = stream.map(JSON::parseObject);

        // TODO 5. 设置水位线
        SingleOutputStreamOperator<JSONObject> withWatermarkDS = mappedStream.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<JSONObject>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<JSONObject>() {
                                    @Override
                                    public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
                                        return jsonObj.getLong("ts") * 1000L;
                                    }
                                }
                        )
        );

        // TODO 6. 开窗
        AllWindowedStream<JSONObject, TimeWindow> windowDS = withWatermarkDS.windowAll(TumblingEventTimeWindows.of(
                org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)));

        // TODO 7. 聚合
        SingleOutputStreamOperator<UserRegisterBean> aggregateDS = windowDS.aggregate(
                new AggregateFunction<JSONObject, Long, Long>() {
                    @Override
                    public Long createAccumulator() {
                        return 0L;
                    }

                    @Override
                    public Long add(JSONObject jsonObj, Long accumulator) {
                        accumulator += 1;
                        return accumulator;
                    }

                    @Override
                    public Long getResult(Long accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Long merge(Long a, Long b) {
                        return null;
                    }
                },
                new AllWindowFunction<Long, UserRegisterBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<Long> values, Collector<UserRegisterBean> out) throws Exception {
                        for (Long value : values) {
                            String stt = AtguiguUtil.toDateTime(window.getStart());
                            String edt = AtguiguUtil.toDateTime(window.getEnd());
                            String curDate = AtguiguUtil.toDate(window.getStart());
                            UserRegisterBean userRegisterBean = new UserRegisterBean(
                                    stt,
                                    edt,
                                    curDate,
                                    value
                            );
                            out.collect(userRegisterBean);
                        }
                    }
                }
        );

        // TODO 8. 写入到 OLAP 数据库
        aggregateDS.map(JSON::toJSONString).print();
//                .addSink(FlinkSinkUtil.getDorisSink("gmall.dws_user_user_register_window"));
    }
}
